/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.test.state;

import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.util.Random;

/**
 * A collection of manual tests that serve to assess the performance of windowed operations. These
 * run in local mode with parallelism 1 with a source that emits data as fast as possible. Thus,
 * these mostly test the performance of the state backend.
 *
 * <p>When doing a release we should manually run theses tests on the version that is to be released
 * and on older version to see if there are performance regressions.
 *
 * <p>When a test is executed it will output how many elements of key {@code "Tuple 0"} have been
 * processed in each window. This gives an estimate of the throughput.
 */
@Ignore
public class ManualWindowSpeedITCase extends AbstractTestBase {

    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();

    @Test
    public void testTumblingIngestionTimeWindowsWithFsBackend() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        String checkpoints = tempFolder.newFolder().toURI().toString();
        env.setStateBackend(new FsStateBackend(checkpoints));

        env.addSource(new InfiniteTupleSource(1_000))
                .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                .reduce(
                        new ReduceFunction<Tuple2<String, Integer>>() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Tuple2<String, Integer> reduce(
                                    Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)
                                    throws Exception {
                                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                            }
                        })
                .filter(
                        new FilterFunction<Tuple2<String, Integer>>() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                                return value.f0.startsWith("Tuple 0");
                            }
                        })
                .print();

        env.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithFsBackendWithLateness() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        String checkpoints = tempFolder.newFolder().toURI().toString();
        env.setStateBackend(new FsStateBackend(checkpoints));

        env.addSource(new InfiniteTupleSource(10_000))
                .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                .allowedLateness(Time.seconds(1))
                .reduce(
                        new ReduceFunction<Tuple2<String, Integer>>() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Tuple2<String, Integer> reduce(
                                    Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)
                                    throws Exception {
                                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                            }
                        })
                .filter(
                        new FilterFunction<Tuple2<String, Integer>>() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                                return value.f0.startsWith("Tuple 0");
                            }
                        })
                .print();

        env.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithRocksDBBackend() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));

        env.addSource(new InfiniteTupleSource(10_000))
                .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                .reduce(
                        new ReduceFunction<Tuple2<String, Integer>>() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Tuple2<String, Integer> reduce(
                                    Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)
                                    throws Exception {
                                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                            }
                        })
                .filter(
                        new FilterFunction<Tuple2<String, Integer>>() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                                return value.f0.startsWith("Tuple 0");
                            }
                        })
                .print();

        env.execute();
    }

    @Test
    public void testTumblingIngestionTimeWindowsWithRocksDBBackendWithLateness() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));

        env.addSource(new InfiniteTupleSource(10_000))
                .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                .allowedLateness(Time.seconds(1))
                .reduce(
                        new ReduceFunction<Tuple2<String, Integer>>() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Tuple2<String, Integer> reduce(
                                    Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)
                                    throws Exception {
                                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                            }
                        })
                .filter(
                        new FilterFunction<Tuple2<String, Integer>>() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public boolean filter(Tuple2<String, Integer> value) throws Exception {
                                return value.f0.startsWith("Tuple 0");
                            }
                        })
                .print();

        env.execute();
    }

    /**
     * A source that emits elements with a fixed set of keys as fast as possible. Used for rough
     * performance estimation.
     */
    public static class InfiniteTupleSource
            implements ParallelSourceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        private int numKeys;

        private volatile boolean running = true;

        public InfiniteTupleSource(int numKeys) {
            this.numKeys = numKeys;
        }

        @Override
        public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception {
            Random random = new Random(42);
            while (running) {
                Tuple2<String, Integer> tuple =
                        new Tuple2<String, Integer>("Tuple " + (random.nextInt(numKeys)), 1);
                out.collect(tuple);
            }
        }

        @Override
        public void cancel() {
            this.running = false;
        }
    }

    /**
     * This {@link WatermarkStrategy} assigns the current system time as the event-time timestamp.
     * In a real use case you should use proper timestamps and an appropriate {@link
     * WatermarkStrategy}.
     */
    private static class IngestionTimeWatermarkStrategy<T> implements WatermarkStrategy<T> {

        private IngestionTimeWatermarkStrategy() {}

        public static <T> IngestionTimeWatermarkStrategy<T> create() {
            return new IngestionTimeWatermarkStrategy<>();
        }

        @Override
        public WatermarkGenerator<T> createWatermarkGenerator(
                WatermarkGeneratorSupplier.Context context) {
            return new AscendingTimestampsWatermarks<>();
        }

        @Override
        public TimestampAssigner<T> createTimestampAssigner(
                TimestampAssignerSupplier.Context context) {
            return (event, timestamp) -> System.currentTimeMillis();
        }
    }
}
